// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Poco/Runnable.h>
#include <Storages/Page/PageDefinesBase.h>
#include <Storages/Page/workload/PSStressEnv.h>

#include <random>

namespace DB::PS::tests
{
static constexpr PageIdU64 MAX_PAGE_ID_DEFAULT = 1000;
class PSRunnable : public Poco::Runnable
{
public:
    explicit PSRunnable(LoggerPtr log)
        : logger(log)
    {}

    void run() override;

    size_t getBytesUsed() const;
    size_t getPagesUsed() const;

    virtual String description() = 0;
    virtual bool runImpl() = 0;

    size_t bytes_used = 0;
    size_t pages_used = 0;

    LoggerPtr logger;
};

// The random page id for adding and removing
struct RandomPageId
{
    // The new page id to add to pagestorage
    DB::PageIdU64 page_id;
    // The page ids to removed from pagestorage
    DB::PageIdU64Set page_id_to_remove;

    explicit RandomPageId(DB::PageIdU64 new_page_id)
        : page_id(new_page_id)
    {}

    RandomPageId(DB::PageIdU64 new_page_id, DB::PageIdU64Set page_id_to_remove_)
        : page_id(new_page_id)
        , page_id_to_remove(page_id_to_remove_)
    {}
};

// The shared status inside workload
struct GlobalStat
{
    // shared status between PSWindowWriter and PSWindowReader
    std::mutex mtx_page_id;
    // The page ids between [left_id_boundary, right_id_boundary)
    // and exists in `commit_ids` && not exists in `pending_remove_ids`
    // is readable
    std::atomic<DB::PageIdU64> right_id_boundary = 0;
    std::atomic<DB::PageIdU64> left_id_boundary = 0;
    std::set<DB::PageIdU64> commit_ids;
    std::set<DB::PageIdU64> pending_remove_ids;

    void commit(const RandomPageId & c);
};

class PSWriter : public PSRunnable
{
public:
    PSWriter(
        const PSPtr & ps_,
        DB::UInt32 index_,
        const std::unique_ptr<GlobalStat> & global_stat_,
        const LoggerPtr & log)
        : PSRunnable(log)
        , ps(ps_)
        , index(index_)
        , global_stat(global_stat_)
    {
        gen.seed(time(nullptr));
    }

    ~PSWriter() override { memory.reset(); }

    String description() override { return fmt::format("(Stress Test Writer {})", index); }

    void setBufferSizeRange(size_t min, size_t max);

    virtual DB::ReadBufferPtr getRandomData();

    bool runImpl() override;

    void write(const RandomPageId & r);

protected:
    virtual RandomPageId genRandomPageId();

protected:
    PSPtr ps;
    DB::UInt32 index = 0;
    std::mt19937 gen;
    DB::PageIdU64 max_page_id = MAX_PAGE_ID_DEFAULT;
    std::unique_ptr<char[]> memory;

    size_t buffer_size_min = 1 * 1024 * 1024;
    size_t buffer_size_max = 1 * 1024 * 1024;

    const std::unique_ptr<GlobalStat> & global_stat;
};


// PSCommonWriter can custom data size/numbers/page id range in one writebatch.
// And it also can set max_io_limit,after send limit size data into pagefile. it will stop itself.
class PSCommonWriter : public PSWriter
{
public:
    PSCommonWriter(
        const PSPtr & ps_,
        DB::UInt32 index_,
        const std::unique_ptr<GlobalStat> & global_stat_,
        const LoggerPtr & log)
        : PSWriter(ps_, index_, global_stat_, log)
    {}

    DB::ReadBufferPtr getRandomData() override;

    String description() override { return fmt::format("(Stress Test Common Writer {})", index); }

    bool runImpl() override;

    void setBatchBufferNums(size_t numbers);

    void setBatchBufferLimit(size_t size_limit);

    void setBatchBufferPageRange(size_t max_page_id_);

    void setFieldSize(const DB::PageFieldSizes & data_sizes);

protected:
    size_t batch_buffer_nums = 100;
    size_t batch_buffer_limit = 0;

    DB::PageFieldSizes data_sizes = {};
};

// PSWindowsWriter can better simulate the user's workload in cooperation with PSWindowsReader
// It can also be used as an independent writer to imitate user writing.
// When the user is using TiFlash, The Pageid which in PageStorage should be continuously incremented.
// In the meantime, The PageId near the end may be constantly updated. So the page ids updated by
// this class looks like:
//
//
//  | Pageid 1          Pageid 100                           Pageid N |
//  |-----------------------------------------------------------------|
//                                                           |              |
//                                                           |    window    |
//
// Every time the page_id written will be generated by a uniform dist.
// When random number bigger than Pageid N, it will be Pageid N + 1, it means new page come.
// When random number smaller than Pageid N, it means Page updated.
// And `genRandomPageId` will also return the page_id that is not likely to be updated later.
class PSWindowWriter : public PSCommonWriter
{
public:
    PSWindowWriter(
        const PSPtr & ps_,
        DB::UInt32 index_,
        const std::unique_ptr<GlobalStat> & global_stat_,
        const LoggerPtr & log)
        : PSCommonWriter(ps_, index_, global_stat_, log)
    {}

    String description() override { return fmt::format("(Stress Test Window Writer {})", index); }

    void setNormalDistributionSigma(size_t sigma);

protected:
    RandomPageId genRandomPageId() override;

protected:
    size_t window_size = 100;
    size_t sigma = 9;
};

class PSIncreaseWriter : public PSCommonWriter
{
public:
    PSIncreaseWriter(
        const PSPtr & ps_,
        DB::UInt32 index_,
        const std::unique_ptr<GlobalStat> & global_stat_,
        const LoggerPtr & log)
        : PSCommonWriter(ps_, index_, global_stat_, log)
    {}

    String description() override { return fmt::format("(Stress Test Increase Writer {})", index); }

    bool runImpl() override;

    void setPageRange(size_t page_range);

protected:
    RandomPageId genRandomPageId() override;

protected:
    size_t begin_page_id = 1;
    size_t end_page_id = 1;
};

class PSReader : public PSRunnable
{
public:
    PSReader(
        const PSPtr & ps_,
        DB::UInt32 index_,
        const std::unique_ptr<GlobalStat> & global_stat_,
        const LoggerPtr & log)
        : PSRunnable(log)
        , ps(ps_)
        , index(index_)
        , global_stat(global_stat_)
    {
        gen.seed(time(nullptr));
    }

    String description() override { return fmt::format("(Stress Test PSReader {})", index); }

    bool runImpl() override;

    void setReadDelay(size_t delay_ms);

    void setReadPageRange(size_t max_page_id);

    void setReadPageNums(size_t page_read_once);

protected:
    virtual DB::PageIdU64s genRandomPageIds();

protected:
    PSPtr ps;
    std::mt19937 gen;
    size_t heavy_read_delay_ms = 0;
    size_t num_pages_read = 5;
    DB::UInt32 index = 0;
    DB::PageIdU64 max_page_id = MAX_PAGE_ID_DEFAULT;
    const std::unique_ptr<GlobalStat> & global_stat;
};

// PSWindowReader can better simulate the user's workload in cooperation with PSWindowsWriter
// It can also be used as an independent reader to imitate user reading
// Same as PSWindowWriter, it contains a "window".
// And the pageid will be randomly generated from this window (random in accordance with the normal distribution)
//
//  | Pageid 1          Pageid 100                           Pageid N |
//  |-----------------------------------------------------------------|
//                                                     |              |
//                                                     |    window    |
// The "window" will move backwards following pageid N.
// Different from PSWindowWriter, The window orientation of PSWindowReader should smaller than Pageid N.
// In this case, the pageid that PSWindowReader needs to read will never disappear.
class PSWindowReader : public PSReader
{
public:
    PSWindowReader(
        const PSPtr & ps_,
        DB::UInt32 index_,
        const std::unique_ptr<GlobalStat> & global_stat_,
        const LoggerPtr & log)
        : PSReader(ps_, index_, global_stat_, log)
    {}

    void setNormalDistributionSigma(size_t sigma);

protected:
    DB::PageIdU64s genRandomPageIds() override;

protected:
    size_t sigma = 11;
    std::mt19937 gen;
};

// PSStuckReader is specially designed for holding snapshots.
// To verify the issue: https://github.com/pingcap/tics/issues/2726
// Please run it as single thread
class PSSnapshotReader : public PSReader
{
public:
    PSSnapshotReader(
        const PSPtr & ps_,
        DB::UInt32 index_,
        const std::unique_ptr<GlobalStat> & global_stat_,
        const LoggerPtr & log)
        : PSReader(ps_, index_, global_stat_, log)
    {}

    bool runImpl() override;

    void setSnapshotGetIntervalMs(size_t snapshot_get_interval_ms_);

protected:
    size_t snapshots_hold_num = 0;
    size_t snapshot_get_interval_ms = 0;
    std::list<DB::PageStorage::SnapshotPtr> snapshots;
};
} // namespace DB::PS::tests
